Skip to content

feat: support kafka consumer for pubsub scenario#6995

Closed
bzp2010 wants to merge 98 commits intoapache:masterfrom
bzp2010:feat-kafka
Closed

feat: support kafka consumer for pubsub scenario#6995
bzp2010 wants to merge 98 commits intoapache:masterfrom
bzp2010:feat-kafka

Conversation

@bzp2010
Copy link
Copy Markdown
Contributor

@bzp2010 bzp2010 commented May 6, 2022

Description

Support kafka consumers for publish-subscribe scenarios.

Implement proposal # (6874)

Checklist

  • I have explained the need for this PR and the problem it solves
  • I have explained the changes or the new features added to this PR
  • I have added tests corresponding to this change
  • I have updated the documentation to reflect this change
  • I have verified that this change is backward compatible (If not, please discuss on the APISIX mailing list first)

@bzp2010 bzp2010 requested a review from spacewander May 10, 2022 19:43
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some @tparam

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043

end

-- skip this loop for non-fatal errors
log.error("failed to receive websocket frame: "..err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("failed to receive websocket frame: "..err)
log.error("failed to receive websocket frame: ", err)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

-- terminate the event loop when a fatal error occurs
if ws.fatal then
ws:send_close()
return "websocket server: "..err
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use break in the while loop and handle the error in the same place

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

-- the pub-sub messages use binary, if the message is not
-- binary, skip this message
if raw_type ~= "binary" then
goto continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add warn log for unexpected input

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

goto continue
end

local data = pb.decode("PubSubReq", raw_data)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check decode error?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkd

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"enable_tls": true,
"ssl_verify": true,
"enable_sasl": true,
"sasl_username": "user",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the example when the code part is accepted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7046

ngx.say(body)
}
}
--- error_code: 200
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check the default error_code

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7032

pb.option("int64_as_string")
local pubsub_protoc = protoc.new()
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad indent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved in #7028

--- config
location /t {
content_by_lua_block {
local protoc = require("protoc")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can exact the common part into helper, and put it in https://github.com/apache/apisix/tree/master/t/lib?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separated #7028

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved #7028

1failed to fetch message, topic: not-exist, partition: 0, err: not found topic
2offset: 0
3offset: 30
4offset: 14 msg: testmsg15
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment about where the msg is from

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7032

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7032

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants